package com.huan.hadoop.mr;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce操作： Key（OrderVo）相同的分为一组， 此处 OrderVo 作为key, 分组是根据 TopNGroupingComparator 来实现，
 * 即 订单编号 相同的认为一组
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:29
 */
public class TopNReducer extends Reducer<OrderVo, LongWritable, OrderVo, NullWritable> {

    @Override
    protected void reduce(OrderVo key, Iterable<LongWritable> values, Reducer<OrderVo, LongWritable, OrderVo, NullWritable>.Context context) throws IOException, InterruptedException {
        int topN = 0;
        // 随着每次遍历， key的 orderId 是相同的（因为是根据这个分组的），但是里面的itemId和price是不同的
        for (LongWritable price : values) {
            topN++;
            if (topN > 2) {
                break;
            }
            // 注意： 此处的key每次输出都不一样
            context.write(key, NullWritable.get());
        }
    }
}
